# Clear memory
rm(list = ls())

# install.packages("arrow", repos = c("https://apache.r-universe.dev"))

# Load packages
pacman::p_load(
  tidyverse, haven, tidylog, janitor, magrittr, RSQLite, DBI, maps, data.table, readxl, arrow
)
`%notin%` <- Negate(`%in%`)

# https://www.baruch.cuny.edu/confluence/plugins/servlet/mobile#content/view/34572939
con <- dbConnect(RSQLite::SQLite(), "raw/IRS/irs_migration_county.sqlite")

# =====================================
# for estimation
# =====================================

fips_concord_o <- maps::state.fips %>%
  select(fips, polyname, abb) %>%
  mutate(polyname = str_extract(polyname, "([^:]+)")) %>%
  distinct() %>%
  add_row(fips = 2, polyname = "alaska", abb = "ak") %>%
  add_row(fips = 15, polyname = "hawaii", abb = "hi") %>%
  mutate(fips = str_pad(fips, 2, "left", "0")) %>%
  rename(
    origin_state = polyname,
    origin = fips,
    origin_abb = abb
  ) %>% 
  mutate(origin_abb = tolower(origin_abb))

fips_concord_d <- fips_concord_o %>%
  rename(
    destination_state = origin_state,
    destination = origin,
    destination_abb = origin_abb
  )

dataframes <- c(paste0("outflow_199", 0:8, "_9", 1:9),
       "outflow_1999_00",
  paste0("outflow_200", 0:8, "_0", 1:9),
  "outflow_2009_10",
  paste0("outflow_201", 0:7, "_1", 1:8)
)

migration <- map_dfr(dataframes, function(df_in){
  
  migration_in <- dbReadTable(con, df_in) %>% 
    as_tibble() %>% 
    select(origin, destination, returns) %>% 
    mutate(destination = ifelse(destination == "63050", origin, destination)) %>% 
    mutate(year = as.numeric(substr(df_in, 9, 12))) %>% 
    arrange(origin, destination) %>% 
    mutate(origin_st = substr(origin, 1, 2)) %>% 
    mutate(destination_st = substr(destination, 1, 2)) %>% 
    filter(destination_st != "63" & destination_st != "57" & 
             origin_st != "11" & destination_st != "11" & origin_st != "57"
           & substr(origin, 3, 5) != "000" & substr(destination, 3, 5) != "000") %>% 
    group_by(origin, destination, year) %>% 
    summarise(migration = sum(returns)) %>% 
    ungroup()
  
  migration_in <- expand.grid(
    origin = unique(migration_in$origin),
    destination = unique(migration_in$origin),
    year = min(migration_in$year)
  ) %>% 
    left_join(migration_in, by = c("origin", "destination", "year"))
  
  # Insert 0s if we don't observe migration
  migration_in[is.na(migration_in)] <- 0
  
  migration_in <- migration_in %>% 
    group_by(origin, year) %>% 
    mutate(initial_pop = sum(migration, na.rm = T)) %>% 
    ungroup() %>% 
    mutate(share = migration/initial_pop) %>% 
    mutate(share = ifelse(is.na(share), 0, share)) %>% 
    select(origin, destination, year, share, initial_pop)
  
  own_df <- migration_in %>% 
    filter(origin == destination) %>% 
    rename(initial_own_share = share) %>% 
    select(-destination) %>% 
    distinct()
  
  migration_in <- migration_in %>% 
    inner_join(own_df) %>% 
    mutate(origin_st = substr(origin, 1, 2)) %>% 
    mutate(destination_st = substr(destination, 1, 2)) %>% 
    filter(destination_st != "63" & destination_st != "57" & origin_st != "11" & destination_st != "11" & origin_st != "57") %>%
    group_by(origin, year) %>% 
    mutate(no_mig = sum(share) < .1) %>% 
    ungroup() %>% 
    mutate(share = ifelse(no_mig & origin == destination, 1, share)) # if we don't observe a county, set out migration = 1
  
  write_parquet(migration_in, paste0("data/IRS/migration_shares_", as.numeric(substr(df_in, 9, 12)), ".pq"))
}
) 

migration_shares <- map_dfr(
  paste0("data/IRS/migration_shares_", as.numeric(substr(dataframes, 9, 12)), ".pq"), 
  read_parquet
  )

arrow::write_parquet(migration_shares, "data/IRS/migration_shares_full.pq")


for (year in as.numeric(substr(dataframes, 9, 12))) {
  filename = paste0("data/IRS/migration_shares_", year , ".pq")
  print(filename)
  file.remove(filename)
}

